package com.xueyuan.wata.daph.node.spark3.dataframe.batch.connector.hbase

import com.xueyuan.wata.daph.spark3.api.node.dataframe.connector.input.DataFrameInput
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.DataFrame

class HBaseInput extends DataFrameInput {
  private final val FORMAT_SOURCE = "org.apache.hadoop.hbase.spark"

  override protected def in(): DataFrame = {
    val config = nodeConfig.asInstanceOf[HBaseConfig]
    val catalog = config.catalog
    val hbaseConfigs = config.hbaseOptions

    val hbaseConfig = HBaseConfiguration.create
    val reader = spark.read.format(FORMAT_SOURCE)
      .option("hbase.spark.use.hbasecontext", true)

    reader.option("catalog", catalog)
    hbaseConfig.set("catalog", catalog)
    hbaseConfigs.foreach { case (key, value) => reader.option(key, value) }
    hbaseConfigs.foreach { case (key, value) => hbaseConfig.set(key, value) }
    new HBaseContext(spark.sparkContext, hbaseConfig)

    reader.load()
  }

  override def getNodeConfigClass = classOf[HBaseConfig]
}